Representing multiple values

                  fun simple(): List<Int> = listOf(1, 2, 3)
 
fun main() {
    simple().forEach { value -> println(value) } 
}
                
                    func simple(): List<Int> = [1, 2, 3]
 
func main() {
    simple().forEach { value -> print(value) } 
}
                  

Sequences

                  fun simple(): Sequence<Int> = sequence { // sequence builder
    for (i in 1..3) {
        Thread.sleep(100) // pretend we are computing it
        yield(i) // yield next value
    }
}
​
fun main() {
    simple().forEach { value -> println(value) } 
}
                
                    func simple(): Sequence<Int> = sequence { // sequence builder
    for (i in 1..3) {
        Thread.sleep(100) // pretend we are computing it
        yield(i) // yield next value
    }
}
​
func main() {
    simple().forEach { value -> print(value) } 
}
                  

Suspending functions

                  suspend fun simple(): List<Int> {
    delay(1000) // pretend we are doing something asynchronous here
    return listOf(1, 2, 3)
}
​
fun main() = runBlocking<Unit> {
    simple().forEach { value -> println(value) } 
}
                
                    suspend func simple(): List<Int> {
    delay(1000) // pretend we are doing something asynchronous here
    return [1, 2, 3]
}
​
func main() = runBlocking<Unit> {
    simple().forEach { value -> print(value) } 
}
                  

Flows

                  fun simple(): Flow<Int> = flow { // flow builder
    for (i in 1..3) {
        delay(100) // pretend we are doing something useful here
        emit(i) // emit next value
    }
}
​
fun main() = runBlocking<Unit> {
    // Launch a concurrent coroutine to check if the main thread is blocked
    launch {
        for (k in 1..3) {
            println("I'm not blocked $k")
            delay(100)
        }
    }
    // Collect the flow
    simple().collect { value -> println(value) } 
}
                
                    func simple(): Flow<Int> = flow { // flow builder
    for (i in 1..3) {
        delay(100) // pretend we are doing something useful here
        emit(i) // emit next value
    }
}
​
func main() = runBlocking<Unit> {
    // Launch a concurrent coroutine to check if the main thread is blocked
    launch {
        for (k in 1..3) {
            print("I'm not blocked $k")
            delay(100)
        }
    }
    // Collect the flow
    simple().collect { value -> print(value) } 
}
                  

Flows are cold

                  fun simple(): Flow<Int> = flow { 
    println("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}
​
fun main() = runBlocking<Unit> {
    println("Calling simple function...")
    val flow = simple()
    println("Calling collect...")
    flow.collect { value -> println(value) } 
    println("Calling collect again...")
    flow.collect { value -> println(value) } 
}
                
                    func simple(): Flow<Int> = flow { 
    print("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}
​
func main() = runBlocking<Unit> {
    print("Calling simple function...")
    let flow = simple()
    print("Calling collect...")
    flow.collect { value -> print(value) } 
    print("Calling collect again...")
    flow.collect { value -> print(value) } 
}
                  

Flow cancellation basics

                  fun simple(): Flow<Int> = flow { 
    for (i in 1..3) {
        delay(100)          
        println("Emitting $i")
        emit(i)
    }
}
​
fun main() = runBlocking<Unit> {
    withTimeoutOrNull(250) { // Timeout after 250ms 
        simple().collect { value -> println(value) } 
    }
    println("Done")
}
                
                    func simple(): Flow<Int> = flow { 
    for (i in 1..3) {
        delay(100)          
        print("Emitting $i")
        emit(i)
    }
}
​
func main() = runBlocking<Unit> {
    withTimeoutOrNull(250) { // Timeout after 250ms 
        simple().collect { value -> print(value) } 
    }
    print("Done")
}
                  

Flow builders

                  // Convert an integer range to a flow
(1..3).asFlow().collect { value -> println(value) }
                
                    // Convert an integer range to a flow
(1..3).asFlow().collect { value -> print(value) }
                  

Intermediate flow operators

                  suspend fun performRequest(request: Int): String {
    delay(1000) // imitate long-running asynchronous work
    return "response $request"
}
​
fun main() = runBlocking<Unit> {
    (1..3).asFlow() // a flow of requests
        .map { request -> performRequest(request) }
        .collect { response -> println(response) }
}
                
                    suspend func performRequest(request: Int) -> String {
    delay(1000) // imitate long-running asynchronous work
    return "response $request"
}
​
func main() = runBlocking<Unit> {
    (1..3).asFlow() // a flow of requests
        .map { request -> performRequest(request) }
        .collect { response -> print(response) }
}
                  

Transform operator

                  (1..3).asFlow() // a flow of requests
    .transform { request ->
        emit("Making request $request") 
        emit(performRequest(request)) 
    }
    .collect { response -> println(response) }
                
                    (1..3).asFlow() // a flow of requests
    .transform { request ->
        emit("Making request $request") 
        emit(performRequest(request)) 
    }
    .collect { response -> print(response) }
                  

Size-limiting operators

                  fun numbers(): Flow<Int> = flow {
    try {                          
        emit(1)
        emit(2) 
        println("This line will not execute")
        emit(3)    
    } finally {
        println("Finally in numbers")
    }
}
​
fun main() = runBlocking<Unit> {
    numbers() 
        .take(2) // take only the first two
        .collect { value -> println(value) }
}
                
                    func numbers(): Flow<Int> = flow {
    try {                          
        emit(1)
        emit(2) 
        print("This line will not execute")
        emit(3)    
    } finally {
        print("Finally in numbers")
    }
}
​
func main() = runBlocking<Unit> {
    numbers() 
        .take(2) // take only the first two
        .collect { value -> print(value) }
}
                  

Terminal flow operators

                  ​
val sum = (1..5).asFlow()
    .map { it * it } // squares of numbers from 1 to 5                           
    .reduce { a, b -> a + b } // sum them (terminal operator)
println(sum)
                
                    ​
let sum = (1..5).asFlow()
    .map { $0 * $0 } // squares of numbers from 1 to 5                           
    .reduce { a, b -> a + b } // sum them (terminal operator)
print(sum)

                  

Flows are sequential

                  ​
(1..5).asFlow()
    .filter {
        println("Filter $it")
        it % 2 == 0              
    }              
    .map { 
        println("Map $it")
        "string $it"
    }.collect { 
        println("Collect $it")
    }
                
                    ​
(1..5).asFlow()
    .filter {
        print("Filter $it")
        $0 % 2 == 0              
    }
    .map {
        print("Map $it")
        "string $it"
    }.collect {
        print("Collect $it")
    }

                  

Flow context

                  withContext(context) {
    simple().collect { value ->
        println(value) // run in the specified context 
    }
}
                
                    withContext(context) {
    simple().collect { value ->
        print(value) // run in the specified context 
    }
}
                  
                  fun simple(): Flow<Int> = flow {
    log("Started simple flow")
    for (i in 1..3) {
        emit(i)
    }
}  
​
fun main() = runBlocking<Unit> {
    simple().collect { value -> log("Collected $value") } 
}
                
                    func simple(): Flow<Int> = flow {
    log("Started simple flow")
    for (i in 1..3) {
        emit(i)
    }
}  
​
func main() = runBlocking<Unit> {
    simple().collect { value -> log("Collected $value") } 
}
                  

Wrong emission withContext

                  fun simple(): Flow<Int> = flow {
    // The WRONG way to change context for CPU-consuming code in flow builder
    kotlinx.coroutines.withContext(Dispatchers.Default) {
        for (i in 1..3) {
            Thread.sleep(100) // pretend we are computing it in CPU-consuming way
            emit(i) // emit next value
        }
    }
}
​
fun main() = runBlocking<Unit> {
    simple().collect { value -> println(value) } 
}
                
                    func simple(): Flow<Int> = flow {
    // The WRONG way to change context for CPU-consuming code in flow builder
    kotlinx.coroutines.withContext(Dispatchers.Default) {
        for (i in 1..3) {
            Thread.sleep(100) // pretend we are computing it in CPU-consuming way
            emit(i) // emit next value
        }
    }
}
​
func main() = runBlocking<Unit> {
    simple().collect { value -> print(value) } 
}
                  

flowOn operator

                  fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        Thread.sleep(100) // pretend we are computing it in CPU-consuming way
        log("Emitting $i")
        emit(i) // emit next value
    }
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder
​
fun main() = runBlocking<Unit> {
    simple().collect { value ->
        log("Collected $value") 
    } 
}
                
                    func simple(): Flow<Int> = flow {
    for (i in 1..3) {
        Thread.sleep(100) // pretend we are computing it in CPU-consuming way
        log("Emitting $i")
        emit(i) // emit next value
    }
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder
​
func main() = runBlocking<Unit> {
    simple().collect { value ->
        log("Collected $value") 
    } 
}
                  

Buffering

                  fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}
​
fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        simple().collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            println(value) 
        } 
    }   
    println("Collected in $time ms")
}
                
                    func simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}
​
func main() = runBlocking<Unit> { 
    let time = measureTimeMillis {
        simple().collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            print(value) 
        } 
    }   
    print("Collected in $time ms")
}
                  
                  val time = measureTimeMillis {
    simple()
        .buffer() // buffer emissions, don't wait
        .collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            println(value) 
        } 
}   
println("Collected in $time ms")
                
                    let time = measureTimeMillis {
    simple()
        .buffer() // buffer emissions, don't wait
        .collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            print(value) 
        } 
}   
print("Collected in $time ms")
                  

Conflation

                  val time = measureTimeMillis {
    simple()
        .conflate() // conflate emissions, don't process each one
        .collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            println(value) 
        } 
}   
println("Collected in $time ms")
                
                    let time = measureTimeMillis {
    simple()
        .conflate() // conflate emissions, don't process each one
        .collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            print(value) 
        } 
}   
print("Collected in $time ms")
                  

Processing the latest value

                  val time = measureTimeMillis {
    simple()
        .collectLatest { value -> // cancel & restart on the latest value
            println("Collecting $value") 
            delay(300) // pretend we are processing it for 300 ms
            println("Done $value") 
        } 
}   
println("Collected in $time ms")
                
                    let time = measureTimeMillis {
    simple()
        .collectLatest { value -> // cancel & restart on the latest value
            print("Collecting $value") 
            delay(300) // pretend we are processing it for 300 ms
            print("Done $value") 
        } 
}   
print("Collected in $time ms")
                  

Zip

                  ​
val nums = (1..3).asFlow() // numbers 1..3
val strs = flowOf("one", "two", "three") // strings 
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
    .collect { println(it) } // collect and print
                
                    ​
let nums = (1..3).asFlow() // numbers 1..3
let strs = flowOf("one", "two", "three") // strings 
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
    .collect { print(it) } // collect and print
                  

Combine

                  ​
val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time 
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
    .collect { value -> // collect and print 
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    }
                
                    ​
let nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
let strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
let startTime = System.currentTimeMillis() // remember the start time 
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
    .collect { value -> // collect and print 
        print("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    }
                  
                  ​
val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms          
val startTime = System.currentTimeMillis() // remember the start time 
nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
    .collect { value -> // collect and print 
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    }
                
                    ​
let nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
let strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms          
let startTime = System.currentTimeMillis() // remember the start time 
nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
    .collect { value -> // collect and print 
        print("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    }
                  

Flattening flows

                  fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}
                
                    func requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}
                  
                  (1..3).asFlow().map { requestFlow(it) }
                
                    (1..3).asFlow().map { requestFlow(it) }
                  

flatMapConcat

                  val startTime = System.currentTimeMillis() // remember the start time 
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
    .flatMapConcat { requestFlow(it) }                                                                           
    .collect { value -> // collect and print 
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    }
                
                    let startTime = System.currentTimeMillis() // remember the start time 
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
    .flatMapConcat { requestFlow(it) }                                                                           
    .collect { value -> // collect and print 
        print("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    }
                  

flatMapMerge

                  val startTime = System.currentTimeMillis() // remember the start time 
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
    .flatMapMerge { requestFlow(it) }                                                                           
    .collect { value -> // collect and print 
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    }
                
                    let startTime = System.currentTimeMillis() // remember the start time 
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
    .flatMapMerge { requestFlow(it) }                                                                           
    .collect { value -> // collect and print 
        print("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    }
                  

flatMapLatest

                  val startTime = System.currentTimeMillis() // remember the start time 
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
    .flatMapLatest { requestFlow(it) }                                                                           
    .collect { value -> // collect and print 
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    }
                
                    let startTime = System.currentTimeMillis() // remember the start time 
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
    .flatMapLatest { requestFlow(it) }                                                                           
    .collect { value -> // collect and print 
        print("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    }
                  

Collector try and catch

                  fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i) // emit next value
    }
}
​
fun main() = runBlocking<Unit> {
    try {
        simple().collect { value ->         
            println(value)
            check(value <= 1) { "Collected $value" }
        }
    } catch (e: Throwable) {
        println("Caught $e")
    } 
}
                
                    func simple(): Flow<Int> = flow {
    for (i in 1..3) {
        print("Emitting $i")
        emit(i) // emit next value
    }
}
​
func main() = runBlocking<Unit> {
    try {
        simple().collect { value ->         
            print(value)
            check(value <= 1) { "Collected $value" }
        }
    } catch (e: Throwable) {
        print("Caught $e")
    } 
}
                  

Everything is caught

                  fun simple(): Flow<String> = 
    flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i) // emit next value
        }
    }
    .map { value ->
        check(value <= 1) { "Crashed on $value" }                 
        "string $value"
    }
​
fun main() = runBlocking<Unit> {
    try {
        simple().collect { value -> println(value) }
    } catch (e: Throwable) {
        println("Caught $e")
    } 
}
                
                    func simple(): Flow<String> = 
    flow {
        for (i in 1..3) {
            print("Emitting $i")
            emit(i) // emit next value
        }
    }
    .map { value ->
        check(value <= 1) { "Crashed on $value" }                 
        "string $value"
    }
​
func main() = runBlocking<Unit> {
    try {
        simple().collect { value -> print(value) }
    } catch (e: Throwable) {
        print("Caught $e")
    } 
}
                  

Exception transparency

                  simple()
    .catch { e -> emit("Caught $e") } // emit on exception
    .collect { value -> println(value) }
                
                    simple()
    .catch { e -> emit("Caught $e") } // emit on exception
    .collect { value -> print(value) }
                  

Transparent catch

                  fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i)
    }
}
​
fun main() = runBlocking<Unit> {
    simple()
        .catch { e -> println("Caught $e") } // does not catch downstream exceptions
        .collect { value ->
            check(value <= 1) { "Collected $value" }                 
            println(value) 
        }
}
                
                    func simple(): Flow<Int> = flow {
    for (i in 1..3) {
        print("Emitting $i")
        emit(i)
    }
}
​
func main() = runBlocking<Unit> {
    simple()
        .catch { e -> print("Caught $e") } // does not catch downstream exceptions
        .collect { value ->
            check(value <= 1) { "Collected $value" }                 
            print(value) 
        }
}
                  

Catching declaratively

                  simple()
    .onEach { value ->
        check(value <= 1) { "Collected $value" }                 
        println(value) 
    }
    .catch { e -> println("Caught $e") }
    .collect()
                
                    simple()
    .onEach { value ->
        check(value <= 1) { "Collected $value" }                 
        print(value) 
    }
    .catch { e -> print("Caught $e") }
    .collect()
                  

Imperative finally block

                  fun simple(): Flow<Int> = (1..3).asFlow()
​
fun main() = runBlocking<Unit> {
    try {
        simple().collect { value -> println(value) }
    } finally {
        println("Done")
    }
}
                
                    func simple(): Flow<Int> = (1..3).asFlow()
​
func main() = runBlocking<Unit> {
    try {
        simple().collect { value -> print(value) }
    } finally {
        print("Done")
    }
}
                  

Declarative handling

                  simple()
    .onCompletion { println("Done") }
    .collect { value -> println(value) }
                
                    simple()
    .onCompletion { print("Done") }
    .collect { value -> print(value) }
                  
                  fun simple(): Flow<Int> = flow {
    emit(1)
    throw RuntimeException()
}
​
fun main() = runBlocking<Unit> {
    simple()
        .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
        .catch { cause -> println("Caught exception") }
        .collect { value -> println(value) }
}
                
                    func simple(): Flow<Int> = flow {
    emit(1)
    throw RuntimeException()
}
​
func main() = runBlocking<Unit> {
    simple()
        .onCompletion { cause -> if (cause != null) print("Flow completed exceptionally") }
        .catch { cause -> print("Caught exception") }
        .collect { value -> print(value) }
}
                  

Successful completion

                  fun simple(): Flow<Int> = (1..3).asFlow()
​
fun main() = runBlocking<Unit> {
    simple()
        .onCompletion { cause -> println("Flow completed with $cause") }
        .collect { value ->
            check(value <= 1) { "Collected $value" }                 
            println(value) 
        }
}
                
                    func simple(): Flow<Int> = (1..3).asFlow()
​
func main() = runBlocking<Unit> {
    simple()
        .onCompletion { cause -> print("Flow completed with $cause") }
        .collect { value ->
            check(value <= 1) { "Collected $value" }                 
            print(value) 
        }
}
                  

Launching flow

                  // Imitate a flow of events
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
​
fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .collect() // <--- Collecting the flow waits
    println("Done")
}
                
                    // Imitate a flow of events
func events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
​
func main() = runBlocking<Unit> {
    events()
        .onEach { event -> print("Event: $event") }
        .collect() // <--- Collecting the flow waits
    print("Done")
}
                  
                  fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .launchIn(this) // <--- Launching the flow in a separate coroutine
    println("Done")
}
                
                    func main() = runBlocking<Unit> {
    events()
        .onEach { event -> print("Event: $event") }
        .launchIn(this) // <--- Launching the flow in a separate coroutine
    print("Done")
}
                  

Flow cancellation checks

                  fun foo(): Flow<Int> = flow { 
    for (i in 1..5) {
        println("Emitting $i") 
        emit(i) 
    }
}
​
fun main() = runBlocking<Unit> {
    foo().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}
                
                    func foo(): Flow<Int> = flow { 
    for (i in 1..5) {
        print("Emitting $i") 
        emit(i) 
    }
}
​
func main() = runBlocking<Unit> {
    foo().collect { value -> 
        if (value == 3) cancel()  
        print(value)
    } 
}
                  
                  fun main() = runBlocking<Unit> {
    (1..5).asFlow().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}
                
                    func main() = runBlocking<Unit> {
    (1..5).asFlow().collect { value -> 
        if (value == 3) cancel()  
        print(value)
    } 
}
                  

Making busy flow cancellable

                  fun main() = runBlocking<Unit> {
    (1..5).asFlow().cancellable().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}
                
                    func main() = runBlocking<Unit> {
    (1..5).asFlow().cancellable().collect { value -> 
        if (value == 3) cancel()  
        print(value)
    } 
}